use futures::StreamExt;
use tauri::Emitter;
use crate::ctp_commands::ctp::state::*;
use crate::ctp_commands::ctp::types::{InstrumentInfo, OrderInfo};

// 启动Trader Stream处理任务
pub fn start_trader_stream_processing(session_id: String) {
    use ctp_trade_lib::trader_api::CThostFtdcTraderSpiOutput;

    tokio::spawn(async move {
        println!("🔄 [STREAM] Starting trader stream processing for session: {}", session_id);

        // 获取Stream的所有权
        let stream_opt = {
            let mut streams = get_trader_streams().lock().unwrap();
            streams.remove(&session_id)
        };

        if let Some(mut stream) = stream_opt {
            // 添加超时和错误处理
            let mut error_count = 0;
            const MAX_ERRORS: usize = 10;

            while let Some(event) = stream.next().await {
                // 使用 catch_unwind 捕获可能的 panic
                let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
                    match event {
                        CThostFtdcTraderSpiOutput::OnRspQryInstrument(packet) => {
                            if let Some(instrument) = &packet.p_instrument {
                                handle_ctp_instrument_callback(instrument, packet.b_is_last);
                            } else if packet.b_is_last {
                                // 即使没有合约数据，如果是最后一个包，也要标记完成
                                let mut query_complete = get_instruments_query_complete().lock().unwrap();
                                *query_complete = true;
                                println!("✅ [STREAM] Instrument query completed (no data)");
                            }
                        },
                        CThostFtdcTraderSpiOutput::OnFrontConnected(_) => {
                            println!("🔗 [STREAM] Trader front connected");
                            
                            // 发送连接成功事件到前端
                            let app_handle_opt = {
                                let handle = get_app_handle().lock().unwrap();
                                handle.clone()
                            };
                            
                            if let Some(app_handle) = app_handle_opt {
                                let connection_data = serde_json::json!({
                                    "type": "trader",
                                    "status": "connected",
                                    "session_id": session_id.clone(),
                                    "timestamp": chrono::Local::now().to_rfc3339()
                                });
                                let _ = app_handle.emit("connection_status_change", &connection_data);
                                println!("📤 [STREAM] 已发送交易连接状态事件: connected");
                            }
                        },
                        CThostFtdcTraderSpiOutput::OnFrontDisconnected(packet) => {
                            let reason_text = match packet.n_reason {
                                0x1001 => "网络读失败",
                                0x1002 => "网络写失败",
                                0x2001 => "接收心跳超时",
                                0x2002 => "发送心跳失败",
                                0x2003 => "收到错误报文",
                                _ => "未知错误"
                            };
                            println!("❌ [STREAM] Trader front disconnected, reason: {} ({})", packet.n_reason, reason_text);

                            // 检查是否已经登录成功
                            let was_logged_in = {
                                let login_status = get_trader_login_status().lock().unwrap();
                                login_status.get(&session_id).copied().unwrap_or(false)
                            };
                            
                            // 更新登录状态为断开
                            {
                                let mut login_status = get_trader_login_status().lock().unwrap();
                                login_status.insert(session_id.clone(), false);
                            }
                            
                            // 只有在已登录成功后的断线才发送事件到前端
                            if was_logged_in {
                                let app_handle_opt = {
                                    let handle = get_app_handle().lock().unwrap();
                                    handle.clone()
                                };
                                
                                if let Some(app_handle) = app_handle_opt {
                                    let disconnection_data = serde_json::json!({
                                        "type": "trader",
                                        "status": "disconnected",
                                        "reason_code": packet.n_reason,
                                        "reason_text": reason_text,
                                        "session_id": session_id.clone(),
                                        "timestamp": chrono::Local::now().to_rfc3339()
                                    });
                                    let _ = app_handle.emit("connection_status_change", &disconnection_data);
                                    println!("📤 [STREAM] 已发送交易断线事件（已登录后断线）: {} ({})", packet.n_reason, reason_text);
                                }
                            } else {
                                println!("ℹ️ [STREAM] 登录过程中的断线，不发送断线事件");
                            }
                        },
                        CThostFtdcTraderSpiOutput::OnRspUserLogin(packet) => {
                            handle_trader_login_response(&session_id, packet);
                        },
                        CThostFtdcTraderSpiOutput::OnRspOrderInsert(packet) => {
                            handle_order_insert_response(packet);
                        },
                        CThostFtdcTraderSpiOutput::OnRspOrderAction(packet) => {
                            handle_order_action_response(packet);
                        },
                        CThostFtdcTraderSpiOutput::OnRtnOrder(packet) => {
                            handle_order_status_update(packet);
                        },
                        CThostFtdcTraderSpiOutput::OnRtnTrade(packet) => {
                            handle_trade_notification(packet);
                        },
                        CThostFtdcTraderSpiOutput::OnRspQryOrder(packet) => {
                            handle_order_query_response(&session_id, packet);
                        },
                        CThostFtdcTraderSpiOutput::OnRspSettlementInfoConfirm(packet) => {
                            handle_settlement_confirm_response(&session_id, packet);
                        },
                        CThostFtdcTraderSpiOutput::OnRspQryInvestorPosition(packet) => {
                            handle_position_query_response(packet);
                        },
                        CThostFtdcTraderSpiOutput::OnRspQryTradingAccount(packet) => {
                            handle_account_query_response(packet);
                        },
                        _ => {
                            // 处理其他事件
                        }
                    }
                }));

                // 处理 panic 结果
                match result {
                    Ok(_) => {
                        // 重置错误计数
                        error_count = 0;
                    },
                    Err(_) => {
                        error_count += 1;
                        println!("❌ [STREAM] Event processing panicked for session: {} (error count: {})", session_id, error_count);

                        if error_count >= MAX_ERRORS {
                            println!("❌ [STREAM] Too many errors, stopping stream processing for session: {}", session_id);
                            break;
                        }

                        // 短暂延迟后继续
                        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
                    }
                }
            }
        }

        println!("🔄 [STREAM] Trader stream processing ended for session: {}", session_id);
    });
}

// 启动MD Stream处理任务
pub fn start_md_stream_processing(session_id: String) {
    use ctp_trade_lib::md_api::CThostFtdcMdSpiOutput;

    tokio::spawn(async move {
        // 获取Stream的所有权
        let stream_opt = {
            let mut streams = get_md_streams().lock().unwrap();
            streams.remove(&session_id)
        };

        if let Some(mut stream) = stream_opt {
            // 添加超时和错误处理
            let mut error_count = 0;
            const MAX_ERRORS: usize = 10;

            while let Some(event) = stream.next().await {
                // 使用 catch_unwind 捕获可能的 panic
                let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
                    match event {
                        CThostFtdcMdSpiOutput::OnRtnDepthMarketData(packet) => {
                            if let Some(market_data) = &packet.p_depth_market_data {
                                handle_ctp_market_data_callback(market_data, &session_id);
                            }
                        },
                        CThostFtdcMdSpiOutput::OnFrontConnected(_) => {
                            println!("🔗 [STREAM] MD front connected");
                            
                            // 发送行情连接成功事件到前端
                            let app_handle_opt = {
                                let handle = get_app_handle().lock().unwrap();
                                handle.clone()
                            };
                            
                            if let Some(app_handle) = app_handle_opt {
                                let connection_data = serde_json::json!({
                                    "type": "md",
                                    "status": "connected",
                                    "session_id": session_id.clone(),
                                    "timestamp": chrono::Local::now().to_rfc3339()
                                });
                                let _ = app_handle.emit("connection_status_change", &connection_data);
                                println!("📤 [STREAM] 已发送行情连接状态事件: connected");
                            }
                        },
                        CThostFtdcMdSpiOutput::OnFrontDisconnected(packet) => {
                            let reason_text = match packet.n_reason {
                                0x1001 => "网络读失败",
                                0x1002 => "网络写失败",
                                0x2001 => "接收心跳超时",
                                0x2002 => "发送心跳失败",
                                0x2003 => "收到错误报文",
                                _ => "未知错误"
                            };
                            println!("❌ [STREAM] MD front disconnected, reason: {} ({})", packet.n_reason, reason_text);
                            
                            // 检查是否已经登录成功
                            let was_logged_in = {
                                let login_status = get_md_login_status().lock().unwrap();
                                login_status.get(&session_id).copied().unwrap_or(false)
                            };
                            
                            // 更新登录状态为断开
                            {
                                let mut login_status = get_md_login_status().lock().unwrap();
                                login_status.insert(session_id.clone(), false);
                            }
                            
                            // 只有在已登录成功后的断线才发送事件到前端
                            if was_logged_in {
                                let app_handle_opt = {
                                    let handle = get_app_handle().lock().unwrap();
                                    handle.clone()
                                };
                                
                                if let Some(app_handle) = app_handle_opt {
                                    let disconnection_data = serde_json::json!({
                                        "type": "md",
                                        "status": "disconnected",
                                        "reason_code": packet.n_reason,
                                        "reason_text": reason_text,
                                        "session_id": session_id.clone(),
                                        "timestamp": chrono::Local::now().to_rfc3339()
                                    });
                                    let _ = app_handle.emit("connection_status_change", &disconnection_data);
                                    println!("📤 [STREAM] 已发送行情断线事件（已登录后断线）: {} ({})", packet.n_reason, reason_text);
                                }
                            } else {
                                println!("ℹ️ [STREAM] 登录过程中的断线，不发送断线事件");
                            }
                        },
                        CThostFtdcMdSpiOutput::OnRspUserLogin(packet) => {
                            handle_md_login_response(&session_id, packet);
                        },
                        _ => {
                            // 处理其他事件
                        }
                    }
                }));

                // 处理 panic 结果
                match result {
                    Ok(_) => {
                        // 重置错误计数
                        error_count = 0;
                    },
                    Err(_) => {
                        error_count += 1;

                        if error_count >= MAX_ERRORS {
                            break;
                        }

                        // 短暂延迟后继续
                        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
                    }
                }
            }
        }
    });
}

// 处理交易登录响应
fn handle_trader_login_response(session_id: &str, packet: ctp_trade_lib::trader_api::CThostFtdcTraderSpiOnRspUserLoginPacket) {
    println!("🔐 [STREAM] Received login response");
    if let Some(rsp_info) = &packet.p_rsp_info {
        if rsp_info.ErrorID == 0 {
            println!("✅ [STREAM] Login successful");

            // 获取并保存FrontID和SessionID
            if let Some(rsp_user_login) = &packet.p_rsp_user_login {
                let front_id = rsp_user_login.FrontID;
                let session_id_num = rsp_user_login.SessionID;

                println!("🔑 [DEBUG] 登录获取到 FrontID: {}, SessionID: {}", front_id, session_id_num);

                // 保存当前会话的FrontID和SessionID
                {
                    let mut front_session_map = get_order_front_session_map().lock().unwrap();
                    front_session_map.insert(format!("session_{}", session_id), (front_id, session_id_num));
                }
            }

            let mut login_status = get_trader_login_status().lock().unwrap();
            login_status.insert(session_id.to_string(), true);

            // 更新登录结果
            let mut login_results = get_login_results().lock().unwrap();
            login_results.insert(session_id.to_string(), (true, "CTP登录成功".to_string()));

            // 发送交易登录成功事件到前端
            let app_handle_opt = {
                let handle = get_app_handle().lock().unwrap();
                handle.clone()
            };
            
            if let Some(app_handle) = app_handle_opt {
                let connection_data = serde_json::json!({
                    "type": "trader",
                    "status": "login_success",
                    "session_id": session_id.to_string(),
                    "timestamp": chrono::Local::now().to_rfc3339()
                });
                let _ = app_handle.emit("connection_status_change", &connection_data);
                println!("📤 [STREAM] 已发送交易登录成功事件: login_success");
            }

            // 登录成功后立即进行自动结算确认
            println!("🔍 [STREAM] 登录成功，开始自动结算确认...");
            let session_id_clone = session_id.to_string();
            tokio::spawn(async move {
                auto_settlement_confirm(&session_id_clone).await;
            });
        } else {
            println!("❌ [STREAM] Login failed: {}",
                ctp_trade_lib::gb18030_cstr_to_str_i8(&rsp_info.ErrorMsg));

            // 更新登录结果
            let mut login_results = get_login_results().lock().unwrap();
            let error_msg = ctp_trade_lib::gb18030_cstr_to_str_i8(&rsp_info.ErrorMsg);
            login_results.insert(session_id.to_string(), (false, error_msg.to_string()));
            
            // 发送交易登录失败事件到前端
            let app_handle_opt = {
                let handle = get_app_handle().lock().unwrap();
                handle.clone()
            };
            
            if let Some(app_handle) = app_handle_opt {
                let connection_data = serde_json::json!({
                    "type": "trader",
                    "status": "login_failed",
                    "reason_text": error_msg.to_string(),
                    "session_id": session_id.to_string(),
                    "timestamp": chrono::Local::now().to_rfc3339()
                });
                let _ = app_handle.emit("connection_status_change", &connection_data);
                println!("📤 [STREAM] 已发送交易登录失败事件: login_failed");
            }
        }
    }
}

// 处理行情登录响应
fn handle_md_login_response(session_id: &str, packet: ctp_trade_lib::md_api::CThostFtdcMdSpiOnRspUserLoginPacket) {
    if let Some(rsp_info) = &packet.p_rsp_info {
        if rsp_info.ErrorID == 0 {
            let mut login_status = get_md_login_status().lock().unwrap();
            login_status.insert(session_id.to_string(), true);

            // 更新登录结果
            let mut login_results = get_login_results().lock().unwrap();
            login_results.insert(session_id.to_string(), (true, "CTP行情登录成功".to_string()));
            
            // 发送行情登录成功事件到前端
            let app_handle_opt = {
                let handle = get_app_handle().lock().unwrap();
                handle.clone()
            };
            
            if let Some(app_handle) = app_handle_opt {
                let connection_data = serde_json::json!({
                    "type": "md",
                    "status": "login_success",
                    "session_id": session_id.to_string(),
                    "timestamp": chrono::Local::now().to_rfc3339()
                });
                let _ = app_handle.emit("connection_status_change", &connection_data);
                println!("📤 [STREAM] 已发送行情登录成功事件: login_success");
            }
        } else {
            // 更新登录结果
            let mut login_results = get_login_results().lock().unwrap();
            let error_msg = ctp_trade_lib::gb18030_cstr_to_str_i8(&rsp_info.ErrorMsg);
            login_results.insert(session_id.to_string(), (false, error_msg.to_string()));
            
            // 发送行情登录失败事件到前端
            let app_handle_opt = {
                let handle = get_app_handle().lock().unwrap();
                handle.clone()
            };
            
            if let Some(app_handle) = app_handle_opt {
                let connection_data = serde_json::json!({
                    "type": "md",
                    "status": "login_failed",
                    "reason_text": error_msg.to_string(),
                    "session_id": session_id.to_string(),
                    "timestamp": chrono::Local::now().to_rfc3339()
                });
                let _ = app_handle.emit("connection_status_change", &connection_data);
                println!("📤 [STREAM] 已发送行情登录失败事件: login_failed");
            }
        }
    }
}

// 处理订单插入响应
fn handle_order_insert_response(packet: ctp_trade_lib::trader_api::CThostFtdcTraderSpiOnRspOrderInsertPacket) {
    println!("📋 [STREAM] Received order insert response");
    if let Some(rsp_info) = &packet.p_rsp_info {
        if rsp_info.ErrorID == 0 {
            println!("✅ [STREAM] Order insert successful");
            if let Some(input_order) = &packet.p_input_order {
                let order_ref = ctp_trade_lib::gb18030_cstr_to_str_i8(&input_order.OrderRef);
                let instrument_id = ctp_trade_lib::gb18030_cstr_to_str_i8(&input_order.InstrumentID);
                println!("📋 [STREAM] Order details - Ref: {}, Instrument: {}, Volume: {}, Price: {}",
                    order_ref, instrument_id, input_order.VolumeTotalOriginal, input_order.LimitPrice);

                // 保存成功结果
                let mut order_results = get_order_insert_results().lock().unwrap();
                order_results.insert(order_ref.trim_end_matches('\0').to_string(),
                    (true, "订单插入成功".to_string()));
            }
        } else {
            let error_msg = ctp_trade_lib::gb18030_cstr_to_str_i8(&rsp_info.ErrorMsg);
            println!("❌ [STREAM] Order insert failed: {}", error_msg);

            // 保存失败结果
            if let Some(input_order) = &packet.p_input_order {
                let order_ref = ctp_trade_lib::gb18030_cstr_to_str_i8(&input_order.OrderRef);
                let mut order_results = get_order_insert_results().lock().unwrap();
                order_results.insert(order_ref.trim_end_matches('\0').to_string(),
                    (false, error_msg.to_string()));
            }
        }
    }
}

// 处理订单操作响应
fn handle_order_action_response(packet: ctp_trade_lib::trader_api::CThostFtdcTraderSpiOnRspOrderActionPacket) {
    println!("📋 [STREAM] Received order action response");
    if let Some(rsp_info) = &packet.p_rsp_info {
        if rsp_info.ErrorID == 0 {
            println!("✅ [STREAM] Order action successful");
            if let Some(input_order_action) = &packet.p_input_order_action {
                let order_ref = ctp_trade_lib::gb18030_cstr_to_str_i8(&input_order_action.OrderRef);
                let instrument_id = ctp_trade_lib::gb18030_cstr_to_str_i8(&input_order_action.InstrumentID);
                let action_ref = input_order_action.OrderActionRef;

                println!("📋 [STREAM] Cancel details - Ref: {}, Instrument: {}, Action: {}, ActionRef: {}",
                    order_ref, instrument_id, input_order_action.ActionFlag, action_ref);

                // 存储撤单成功结果
                let mut action_results = get_order_action_results().lock().unwrap();
                let action_ref_str = action_ref.to_string();
                let order_ref_clean = order_ref.trim_end_matches('\0').to_string();
                action_results.insert(action_ref_str, (true, "撤单成功".to_string(), order_ref_clean));
            }
        } else {
            let error_msg = ctp_trade_lib::gb18030_cstr_to_str_i8(&rsp_info.ErrorMsg);
            println!("❌ [STREAM] Order action failed: {}", error_msg);

            // 存储撤单失败结果
            if let Some(input_order_action) = &packet.p_input_order_action {
                let order_ref = ctp_trade_lib::gb18030_cstr_to_str_i8(&input_order_action.OrderRef);
                let action_ref = input_order_action.OrderActionRef;

                let mut action_results = get_order_action_results().lock().unwrap();
                let action_ref_str = action_ref.to_string();
                let order_ref_clean = order_ref.trim_end_matches('\0').to_string();
                action_results.insert(action_ref_str, (false, error_msg.to_string(), order_ref_clean));
            }
        }
    }
}

// 处理订单状态更新
fn handle_order_status_update(packet: ctp_trade_lib::trader_api::CThostFtdcTraderSpiOnRtnOrderPacket) {
    println!("📋 [STREAM] Received order status update");
    if let Some(order) = &packet.p_order {
        let order_ref = ctp_trade_lib::gb18030_cstr_to_str_i8(&order.OrderRef);
        let instrument_id = ctp_trade_lib::gb18030_cstr_to_str_i8(&order.InstrumentID);
        let order_status = order.OrderStatus as u8;

        // 详细的订单状态说明
        let status_desc = match order_status {
            48 => "全部成交", // '0'
            49 => "部分成交还在队列中", // '1'
            50 => "部分成交不在队列中", // '2'
            51 => "未成交还在队列中", // '3'
            52 => "未成交不在队列中", // '4'
            53 => "撤单", // '5'
            97 => "未知", // 'a'
            98 => "尚未触发", // 'b'
            99 => "已触发", // 'c'
            _ => "其他状态",
        };

        println!("📋 [STREAM] Order update - Ref: {}, Instrument: {}, Status: {} ({}), Volume: {}/{}, FrontID: {}, SessionID: {}",
            order_ref, instrument_id, order_status, status_desc, order.VolumeTraded, order.VolumeTotal, order.FrontID, order.SessionID);

        // 特别关注状态49（部分成交）
        if order_status == 49 {
            println!("🔥 [STREAM] *** 检测到部分成交状态49！***");
            println!("🔥 [STREAM] 订单 {} 部分成交：{}/{} 手", order_ref, order.VolumeTraded, order.VolumeTotal);
        }

        // 发送订单状态更新事件到前端
        let app_handle_opt = {
            let handle = get_app_handle().lock().unwrap();
            handle.clone()
        };

        if let Some(app_handle) = app_handle_opt {
            let order_update_data = serde_json::json!({
                "order_ref": order_ref,
                "ref": order_ref,
                "instrument_id": instrument_id,
                "status": order_status,
                "status_desc": status_desc,
                "price": order.LimitPrice,
                "volume": order.VolumeTotal,
                "volume_traded": order.VolumeTraded,
                "direction": if order.Direction == '0' as i8 { "0" } else { "1" },
                "front_id": order.FrontID,
                "session_id": order.SessionID
            });

            // 根据窗口上下文发送精确通知
            let context_map = get_order_window_context().lock().unwrap();
            if let Some(window_context) = context_map.get(order_ref.as_ref()) {
                // 如果有窗口上下文，发送精确通知
                if let Some(window_id) = &window_context.window_id {
                    if !window_id.is_empty() {
                        // 发送到特定窗口
                        let event_name = format!("order_update_{}", window_id);
                        let _ = app_handle.emit(&event_name, &order_update_data);
                        println!("📤 [STREAM] 已发送订单状态更新事件到特定窗口 {}: {}", window_id, order_update_data);
                    } else {
                        // 空窗口ID表示账号全撤等操作，发送到所有相关合约的窗口
                        if let Some(contract_code) = &window_context.contract_code {
                            let event_name = format!("order_update_{}", contract_code);
                            let _ = app_handle.emit(&event_name, &order_update_data);
                            println!("📤 [STREAM] 已发送订单状态更新事件到合约 {}: {}", contract_code, order_update_data);
                        }
                    }
                } else {
                    // 没有窗口ID，但有合约代码，发送到合约相关窗口
                    if let Some(contract_code) = &window_context.contract_code {
                        let event_name = format!("order_update_{}", contract_code);
                        let _ = app_handle.emit(&event_name, &order_update_data);
                        println!("📤 [STREAM] 已发送订单状态更新事件到合约 {}: {}", contract_code, order_update_data);
                    }
                }
            } else {
                // 没有窗口上下文，使用传统的全局广播（向后兼容）
                let _ = app_handle.emit("order_update", &order_update_data);
                println!("📤 [STREAM] 已发送订单状态更新事件到前端（全局广播）: {}", order_update_data);
            }
        }

        // 保存订单的FrontID和SessionID，用于后续撤单
        {
            let mut front_session_map = get_order_front_session_map().lock().unwrap();
            front_session_map.insert(order_ref.to_string(), (order.FrontID, order.SessionID));
        }

        // 如果订单被拒绝或有错误，更新订单插入结果
        if order_status == 52 { // 未成交不在队列中 - 通常表示被拒绝
            let mut order_results = get_order_insert_results().lock().unwrap();
            let order_ref_clean = order_ref.trim_end_matches('\0').to_string();

            // 获取订单状态信息
            let status_msg = ctp_trade_lib::gb18030_cstr_to_str_i8(&order.StatusMsg);
            let error_msg = if status_msg.trim().is_empty() {
                format!("订单被拒绝 - 状态: {}", status_desc)
            } else {
                format!("订单被拒绝 - {}", status_msg)
            };

            order_results.insert(order_ref_clean, (false, error_msg));
            println!("❌ [STREAM] Order rejected - Ref: {}, Reason: {}", order_ref, status_msg);
        }
    }
}

// 处理成交通知
fn handle_trade_notification(packet: ctp_trade_lib::trader_api::CThostFtdcTraderSpiOnRtnTradePacket) {
    println!("💰 [STREAM] ========== Received trade notification ==========");
    println!("💰 [STREAM] OnRtnTrade event triggered!");
    if let Some(trade) = &packet.p_trade {
        let order_ref = ctp_trade_lib::gb18030_cstr_to_str_i8(&trade.OrderRef);
        let instrument_id = ctp_trade_lib::gb18030_cstr_to_str_i8(&trade.InstrumentID);
        println!("💰 [STREAM] Trade executed - Ref: {}, Instrument: {}, Volume: {}, Price: {}",
            order_ref, instrument_id, trade.Volume, trade.Price);

        // 发送成交通知事件到前端
        let app_handle_opt = {
            let handle = get_app_handle().lock().unwrap();
            handle.clone()
        };

        if let Some(app_handle) = app_handle_opt {
            let trade_data = serde_json::json!({
                "order_ref": order_ref,
                "ref": order_ref,
                "instrument_id": instrument_id,
                "price": trade.Price,
                "volume": trade.Volume,
                "direction": if trade.Direction == '0' as i8 { "0" } else { "1" },
                "trade_id": ctp_trade_lib::gb18030_cstr_to_str_i8(&trade.TradeID),
                "trade_time": ctp_trade_lib::gb18030_cstr_to_str_i8(&trade.TradeTime)
            });

            // 根据窗口上下文发送精确通知
            let context_map = get_order_window_context().lock().unwrap();
            if let Some(window_context) = context_map.get(order_ref.as_ref()) {
                // 如果有窗口上下文，发送精确通知
                if let Some(window_id) = &window_context.window_id {
                    if !window_id.is_empty() {
                        // 发送到特定窗口
                        let event_name = format!("trade_notification_{}", window_id);
                        let _ = app_handle.emit(&event_name, &trade_data);
                        println!("📤 [STREAM] 已发送成交通知事件到特定窗口 {}: {}", window_id, trade_data);
                    } else {
                        // 空窗口ID表示账号全撤等操作，发送到所有相关合约的窗口
                        if let Some(contract_code) = &window_context.contract_code {
                            let event_name = format!("trade_notification_{}", contract_code);
                            let _ = app_handle.emit(&event_name, &trade_data);
                            println!("📤 [STREAM] 已发送成交通知事件到合约 {}: {}", contract_code, trade_data);
                        }
                    }
                } else {
                    // 没有窗口ID，但有合约代码，发送到合约相关窗口
                    if let Some(contract_code) = &window_context.contract_code {
                        let event_name = format!("trade_notification_{}", contract_code);
                        let _ = app_handle.emit(&event_name, &trade_data);
                        println!("📤 [STREAM] 已发送成交通知事件到合约 {}: {}", contract_code, trade_data);
                    }
                }
            } else {
                // 没有窗口上下文，使用传统的全局广播（向后兼容）
                let _ = app_handle.emit("trade_notification", &trade_data);
                println!("📤 [STREAM] 已发送成交通知事件到前端（全局广播）: {}", trade_data);
            }
        }
    }
}

// 处理订单查询响应
fn handle_order_query_response(session_id: &str, packet: ctp_trade_lib::trader_api::CThostFtdcTraderSpiOnRspQryOrderPacket) {
    // 只在查询开始时打印一次，避免每个订单都打印
    static mut QUERY_STARTED: bool = false;

    // 首先检查是否有错误信息
    let has_error = if let Some(rsp_info) = &packet.p_rsp_info {
        if rsp_info.ErrorID != 0 {
            let error_msg = ctp_trade_lib::gb18030_cstr_to_str_i8(&rsp_info.ErrorMsg);
            println!("❌ [STREAM] Order query failed!");
            println!("  - ErrorID: {}", rsp_info.ErrorID);
            println!("  - ErrorMsg: {}", error_msg);
            true
        } else {
            false
        }
    } else {
        // 没有 rsp_info，这在CTP中是正常的（表示成功）
        false
    };

    // 如果没有错误，处理订单数据
    if !has_error {
        // 只在第一次收到响应时打印开始消息
        unsafe {
            if !QUERY_STARTED {
                println!("🔍 [STREAM] 开始接收订单查询响应 (session: {})...", session_id);
                QUERY_STARTED = true;
            }
        }

        if let Some(order) = &packet.p_order {
            let order_ref = ctp_trade_lib::gb18030_cstr_to_str_i8(&order.OrderRef);
            let instrument_id = ctp_trade_lib::gb18030_cstr_to_str_i8(&order.InstrumentID);
            let order_status = order.OrderStatus as u8;
            let order_submit_status = order.OrderSubmitStatus as u8;

            // 扩展未成交订单的状态范围，包括：
            // 97 = 未知状态（刚提交）- 'a'
            // 51 = 未成交还在队列中 - '3'
            // 49 = 部分成交还在队列中 - '1'
            // 50 = 部分成交不在队列中 - '2' ⭐ 新增
            // 52 = 未成交不在队列中 - '4' ⭐ 新增
            // 98 = 尚未触发 - 'b' (条件单)
            //
            // 同时检查订单提交状态：
            // 48 = 已经提交 - '0'
            // 49 = 撤单已经提交 - '1'
            // 51 = 已经接受 - '3'
            // 52 = 报单已经被拒绝 - '4'

            // 判断是否为可撤销的订单
            let is_cancelable = match order_status {
                97 | 51 | 49 | 50 | 52 | 98 => true,  // 未知、未成交(在队列/不在队列)、部分成交(在队列/不在队列)、尚未触发
                _ => false
            };

            // 同时检查提交状态，排除已被拒绝的订单
            let is_valid_submit_status = order_submit_status != 52; // 52 = 报单已经被拒绝

            if is_cancelable && is_valid_submit_status {
                let direction = if order.Direction == 48 { "买入".to_string() } else { "卖出".to_string() };
                let status_desc = get_order_status_desc(order_status);

                let order_info = OrderInfo {
                    order_ref: order_ref.to_string(),
                    instrument_id: instrument_id.to_string(),
                    direction,
                    price: order.LimitPrice,
                    volume: order.VolumeTotalOriginal,
                    volume_traded: order.VolumeTraded,
                    order_status: status_desc.to_string(),
                    front_id: order.FrontID,
                    session_id: order.SessionID,
                };

                // 修复：直接添加到当前session的查询结果
                let mut queried_orders = get_queried_orders().lock().unwrap();

                // 确保该session的订单列表存在
                if !queried_orders.contains_key(session_id) {
                    queried_orders.insert(session_id.to_string(), Vec::new());
                }

                // 添加到当前session
                if let Some(orders) = queried_orders.get_mut(session_id) {
                    orders.push(order_info.clone());
                }
            }
        }

        if packet.b_is_last {
            // 重置查询开始标志，为下次查询做准备
            unsafe {
                QUERY_STARTED = false;
            }
        }
    }
}

// 辅助函数：获取订单状态描述
fn get_order_status_desc(order_status: u8) -> &'static str {
    match order_status {
        48 => "全部成交",           // '0'
        49 => "部分成交还在队列中",  // '1'
        50 => "部分成交不在队列中",  // '2'
        51 => "未成交还在队列中",    // '3'
        52 => "未成交不在队列中",    // '4'
        53 => "撤单",               // '5'
        97 => "未知",               // 'a'
        98 => "尚未触发",           // 'b'
        99 => "已触发",             // 'c'
        _ => "其他状态"
    }
}

// 处理结算确认响应
fn handle_settlement_confirm_response(session_id: &str, packet: ctp_trade_lib::trader_api::CThostFtdcTraderSpiOnRspSettlementInfoConfirmPacket) {
    println!("📋 [STREAM] Received settlement confirm response");
    if let Some(rsp_info) = &packet.p_rsp_info {
        if rsp_info.ErrorID == 0 {
            println!("✅ [STREAM] Settlement confirm successful");
            let mut confirm_status = get_settlement_confirm_status().lock().unwrap();
            confirm_status.insert(session_id.to_string(), true);

            // 更新结算确认结果
            let mut confirm_results = get_settlement_confirm_results().lock().unwrap();
            confirm_results.insert(session_id.to_string(), (true, "结算结果确认成功".to_string()));
        } else {
            let error_msg = ctp_trade_lib::gb18030_cstr_to_str_i8(&rsp_info.ErrorMsg);
            println!("❌ [STREAM] Settlement confirm failed: {}", error_msg);

            // 更新结算确认结果
            let mut confirm_results = get_settlement_confirm_results().lock().unwrap();
            confirm_results.insert(session_id.to_string(), (false, error_msg.to_string()));
        }
    }
}

// 处理CTP合约查询回调
pub fn handle_ctp_instrument_callback(instrument: &ctp_trade_lib::CThostFtdcInstrumentField, is_last: bool) {
    // 将CTP合约数据转换为我们的InstrumentInfo格式
    let instrument_info = InstrumentInfo {
        instrument_id: ctp_trade_lib::gb18030_cstr_to_str_i8(&instrument.InstrumentID).trim_end_matches('\0').to_string(),
        exchange_id: ctp_trade_lib::gb18030_cstr_to_str_i8(&instrument.ExchangeID).trim_end_matches('\0').to_string(),
        instrument_name: ctp_trade_lib::gb18030_cstr_to_str_i8(&instrument.InstrumentName).to_string(),
        exchange_inst_id: ctp_trade_lib::gb18030_cstr_to_str_i8(&instrument.ExchangeInstID).trim_end_matches('\0').to_string(),
        product_id: ctp_trade_lib::gb18030_cstr_to_str_i8(&instrument.ProductID).trim_end_matches('\0').to_string(),
        product_class: format!("{}", instrument.ProductClass as u8),
        delivery_year: instrument.DeliveryYear,
        delivery_month: instrument.DeliveryMonth,
        max_market_order_volume: instrument.MaxMarketOrderVolume,
        min_market_order_volume: instrument.MinMarketOrderVolume,
        max_limit_order_volume: instrument.MaxLimitOrderVolume,
        min_limit_order_volume: instrument.MinLimitOrderVolume,
        volume_multiple: instrument.VolumeMultiple,
        price_tick: instrument.PriceTick,
        create_date: ctp_trade_lib::gb18030_cstr_to_str_i8(&instrument.CreateDate).trim_end_matches('\0').to_string(),
        open_date: ctp_trade_lib::gb18030_cstr_to_str_i8(&instrument.OpenDate).trim_end_matches('\0').to_string(),
        expire_date: ctp_trade_lib::gb18030_cstr_to_str_i8(&instrument.ExpireDate).trim_end_matches('\0').to_string(),
        start_deliv_date: ctp_trade_lib::gb18030_cstr_to_str_i8(&instrument.StartDelivDate).trim_end_matches('\0').to_string(),
        end_deliv_date: ctp_trade_lib::gb18030_cstr_to_str_i8(&instrument.EndDelivDate).trim_end_matches('\0').to_string(),
        inst_life_phase: format!("{}", instrument.InstLifePhase as u8),
        is_trading: instrument.IsTrading,
        position_type: format!("{}", instrument.PositionType as u8),
        position_date_type: format!("{}", instrument.PositionDateType as u8),
        long_margin_ratio: instrument.LongMarginRatio,
        short_margin_ratio: instrument.ShortMarginRatio,
        max_margin_side_algorithm: format!("{}", instrument.MaxMarginSideAlgorithm as u8),
        underlying_inst_id: ctp_trade_lib::gb18030_cstr_to_str_i8(&instrument.UnderlyingInstrID).trim_end_matches('\0').to_string(),
        strike_price: instrument.StrikePrice,
        options_type: format!("{}", instrument.OptionsType as u8),
        underlying_multiple: instrument.UnderlyingMultiple,
        combination_type: format!("{}", instrument.CombinationType as u8),
    };

    // 添加到全局合约数据中
    {
        let mut instruments_data = get_ctp_instruments_data().lock().unwrap();
        instruments_data.push(instrument_info);
    }

    // 如果是最后一个包，标记查询完成
    if is_last {
        let mut query_complete = get_instruments_query_complete().lock().unwrap();
        *query_complete = true;

        let instruments_count = {
            let instruments_data = get_ctp_instruments_data().lock().unwrap();
            instruments_data.len()
        };

        println!("✅ [STREAM] Instrument query completed, {} instruments loaded", instruments_count);
    }
}

// 处理CTP行情数据回调
pub fn handle_ctp_market_data_callback(market_data: &ctp_trade_lib::CThostFtdcDepthMarketDataField, _session_id: &str) {
    // 将CTP行情数据转换为我们的MarketDataInfo格式
    let instrument_id = ctp_trade_lib::gb18030_cstr_to_str_i8(&market_data.InstrumentID).trim_end_matches('\0').to_string();

    // 创建行情数据结构
    let market_data_info = serde_json::json!({
        "instrument_id": instrument_id,
        "last_price": if market_data.LastPrice != f64::MAX { market_data.LastPrice } else { 0.0 },
        "volume": market_data.Volume,
        "turnover": market_data.Turnover,
        "open_interest": market_data.OpenInterest,
        "pre_close_price": if market_data.PreClosePrice != f64::MAX { market_data.PreClosePrice } else { 0.0 },
        "pre_settlement_price": if market_data.PreSettlementPrice != f64::MAX { market_data.PreSettlementPrice } else { 0.0 },
        "pre_open_interest": market_data.PreOpenInterest,
        "open_price": if market_data.OpenPrice != f64::MAX { market_data.OpenPrice } else { 0.0 },
        "highest_price": if market_data.HighestPrice != f64::MAX { market_data.HighestPrice } else { 0.0 },
        "lowest_price": if market_data.LowestPrice != f64::MAX { market_data.LowestPrice } else { 0.0 },
        "upper_limit_price": if market_data.UpperLimitPrice != f64::MAX { market_data.UpperLimitPrice } else { 0.0 },
        "lower_limit_price": if market_data.LowerLimitPrice != f64::MAX { market_data.LowerLimitPrice } else { 0.0 },
        "settlement_price": if market_data.SettlementPrice != f64::MAX { market_data.SettlementPrice } else { 0.0 },
        "currency_id": "CNY".to_string(), // Default currency for Chinese futures market
        // 5档买盘数据
        "bid_price1": if market_data.BidPrice1 != f64::MAX { market_data.BidPrice1 } else { 0.0 },
        "bid_volume1": market_data.BidVolume1,
        "bid_price2": if market_data.BidPrice2 != f64::MAX { market_data.BidPrice2 } else { 0.0 },
        "bid_volume2": market_data.BidVolume2,
        "bid_price3": if market_data.BidPrice3 != f64::MAX { market_data.BidPrice3 } else { 0.0 },
        "bid_volume3": market_data.BidVolume3,
        "bid_price4": if market_data.BidPrice4 != f64::MAX { market_data.BidPrice4 } else { 0.0 },
        "bid_volume4": market_data.BidVolume4,
        "bid_price5": if market_data.BidPrice5 != f64::MAX { market_data.BidPrice5 } else { 0.0 },
        "bid_volume5": market_data.BidVolume5,
        // 5档卖盘数据
        "ask_price1": if market_data.AskPrice1 != f64::MAX { market_data.AskPrice1 } else { 0.0 },
        "ask_volume1": market_data.AskVolume1,
        "ask_price2": if market_data.AskPrice2 != f64::MAX { market_data.AskPrice2 } else { 0.0 },
        "ask_volume2": market_data.AskVolume2,
        "ask_price3": if market_data.AskPrice3 != f64::MAX { market_data.AskPrice3 } else { 0.0 },
        "ask_volume3": market_data.AskVolume3,
        "ask_price4": if market_data.AskPrice4 != f64::MAX { market_data.AskPrice4 } else { 0.0 },
        "ask_volume4": market_data.AskVolume4,
        "ask_price5": if market_data.AskPrice5 != f64::MAX { market_data.AskPrice5 } else { 0.0 },
        "ask_volume5": market_data.AskVolume5,
        "update_time": ctp_trade_lib::gb18030_cstr_to_str_i8(&market_data.UpdateTime).trim_end_matches('\0').to_string(),
        "update_millisec": market_data.UpdateMillisec,
        "action_day": ctp_trade_lib::gb18030_cstr_to_str_i8(&market_data.ActionDay).trim_end_matches('\0').to_string(),
    });

    // 发送行情数据事件到前端
    let app_handle_opt = {
        let handle = get_app_handle().lock().unwrap();
        handle.clone()
    };

    if let Some(app_handle) = app_handle_opt {
        // 发送market_data事件到前端
        let _ = app_handle.emit("market_data", &market_data_info);
    }
}

// 自动结算确认函数
async fn auto_settlement_confirm(session_id: &str) {
    use crate::ctp_commands::ctp::auth::settlement_info_confirm;

    println!("🔍 [AUTO] 开始自动结算确认，session_id: {}", session_id);

    // 等待一小段时间，确保登录完全完成
    tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;

    // 检查结算确认状态
    let confirm_status = {
        let confirm_status = get_settlement_confirm_status().lock().unwrap();
        confirm_status.get(session_id).copied().unwrap_or(false)
    };

    if confirm_status {
        println!("✅ [AUTO] 结算已确认，无需重复确认");
        return;
    }

    println!("🔍 [AUTO] 结算未确认，开始自动确认...");

    // 调用结算确认
    let result = settlement_info_confirm(session_id.to_string());

    match result {
        crate::ctp_commands::ctp::types::ApiResponse { success: true, data: Some(message), .. } => {
            println!("✅ [AUTO] 自动结算确认成功: {}", message);
        },
        crate::ctp_commands::ctp::types::ApiResponse { success: false, error: Some(error), .. } => {
            println!("❌ [AUTO] 自动结算确认失败: {}", error);
        },
        _ => {
            println!("⚠️ [AUTO] 自动结算确认返回未知结果");
        }
    }
}

// 处理持仓查询响应
fn handle_position_query_response(packet: ctp_trade_lib::trader_api::CThostFtdcTraderSpiOnRspQryInvestorPositionPacket) {
    use crate::ctp_commands::ctp::state::{get_ctp_positions_data, get_positions_query_complete};
    use crate::ctp_commands::ctp::types::PositionInfo;

    println!("📊 [STREAM] Received position query response");

    if let Some(rsp_info) = &packet.p_rsp_info {
        if rsp_info.ErrorID != 0 {
            let error_msg = ctp_trade_lib::gb18030_cstr_to_str_i8(&rsp_info.ErrorMsg);
            println!("❌ [STREAM] Position query failed: {}", error_msg);

            // 如果是最后一个包，标记查询完成
            if packet.b_is_last {
                let mut query_complete = get_positions_query_complete().lock().unwrap();
                *query_complete = true;
            }
            return;
        }
    }

    // 处理持仓数据
    if let Some(position) = &packet.p_investor_position {
        let position_info = PositionInfo {
            instrument_id: ctp_trade_lib::gb18030_cstr_to_str_i8(&position.InstrumentID).trim_end_matches('\0').to_string(),
            broker_id: ctp_trade_lib::gb18030_cstr_to_str_i8(&position.BrokerID).trim_end_matches('\0').to_string(),
            investor_id: ctp_trade_lib::gb18030_cstr_to_str_i8(&position.InvestorID).trim_end_matches('\0').to_string(),
            posi_direction: format!("{}", position.PosiDirection as u8),
            hedge_flag: format!("{}", position.HedgeFlag as u8),
            position_date: format!("{}", position.PositionDate as u8),
            yd_position: position.YdPosition,
            position: position.Position,
            long_frozen: position.LongFrozen,
            short_frozen: position.ShortFrozen,
            long_frozen_amount: position.LongFrozenAmount,
            short_frozen_amount: position.ShortFrozenAmount,
            open_volume: position.OpenVolume,
            close_volume: position.CloseVolume,
            open_amount: position.OpenAmount,
            close_amount: position.CloseAmount,
            position_cost: position.PositionCost,
            pre_margin: position.PreMargin,
            use_margin: position.UseMargin,
            frozen_margin: position.FrozenMargin,
            frozen_cash: position.FrozenCash,
            frozen_commission: position.FrozenCommission,
            cash_in: position.CashIn,
            commission: position.Commission,
            close_profit: position.CloseProfit,
            position_profit: position.PositionProfit,
            pre_settlement_price: position.PreSettlementPrice,
            settlement_price: position.SettlementPrice,
            trading_day: ctp_trade_lib::gb18030_cstr_to_str_i8(&position.TradingDay).trim_end_matches('\0').to_string(),
            settlement_id: position.SettlementID,
            open_cost: position.OpenCost,
            exchange_margin: position.ExchangeMargin,
            comb_position: position.CombPosition,
            comb_long_frozen: position.CombLongFrozen,
            comb_short_frozen: position.CombShortFrozen,
            close_profit_by_date: position.CloseProfitByDate,
            close_profit_by_trade: position.CloseProfitByTrade,
            today_position: position.TodayPosition,
            margin_rate_by_money: position.MarginRateByMoney,
            margin_rate_by_volume: position.MarginRateByVolume,
            strike_frozen: position.StrikeFrozen,
            strike_frozen_amount: position.StrikeFrozenAmount,
            abandon_frozen: position.AbandonFrozen,
            exchange_id: ctp_trade_lib::gb18030_cstr_to_str_i8(&position.ExchangeID).trim_end_matches('\0').to_string(),
            yd_strike_frozen: position.YdStrikeFrozen,
            invest_unit_id: ctp_trade_lib::gb18030_cstr_to_str_i8(&position.InvestUnitID).trim_end_matches('\0').to_string(),
            position_cost_offset: position.PositionCostOffset,
            tas_position: position.TasPosition,
            tas_position_cost: position.TasPositionCost,
        };

        // 添加到持仓数据列表
        {
            let mut positions_data = get_ctp_positions_data().lock().unwrap();
            positions_data.push(position_info);
        }

        println!("📊 [STREAM] Added position: {} {} {} hands",
            ctp_trade_lib::gb18030_cstr_to_str_i8(&position.InstrumentID).trim_end_matches('\0'),
            if position.PosiDirection as u8 == 2 { "多头" } else { "空头" },
            position.Position
        );
    }

    // 如果是最后一个包，标记查询完成
    if packet.b_is_last {
        let mut query_complete = get_positions_query_complete().lock().unwrap();
        *query_complete = true;

        let positions_count = {
            let positions_data = get_ctp_positions_data().lock().unwrap();
            positions_data.len()
        };

        println!("✅ [STREAM] Position query completed, {} positions loaded", positions_count);
    }
}

// 处理账户查询响应
fn handle_account_query_response(packet: ctp_trade_lib::trader_api::CThostFtdcTraderSpiOnRspQryTradingAccountPacket) {
    use crate::ctp_commands::ctp::state::{get_ctp_account_data, get_account_query_complete};
    use crate::ctp_commands::ctp::types::AccountInfo;

    println!("💰 [STREAM] Received account query response");

    if let Some(rsp_info) = &packet.p_rsp_info {
        if rsp_info.ErrorID != 0 {
            let error_msg = ctp_trade_lib::gb18030_cstr_to_str_i8(&rsp_info.ErrorMsg);
            println!("❌ [STREAM] Account query failed: {}", error_msg);

            // 标记查询完成（即使失败）
            let mut query_complete = get_account_query_complete().lock().unwrap();
            *query_complete = true;
            return;
        }
    }

    // 处理账户数据
    if let Some(account) = &packet.p_trading_account {
        let account_info = AccountInfo {
            broker_id: ctp_trade_lib::gb18030_cstr_to_str_i8(&account.BrokerID).trim_end_matches('\0').to_string(),
            account_id: ctp_trade_lib::gb18030_cstr_to_str_i8(&account.AccountID).trim_end_matches('\0').to_string(),
            pre_mortgage: account.PreMortgage,
            pre_credit: account.PreCredit,
            pre_deposit: account.PreDeposit,
            pre_balance: account.PreBalance,
            pre_margin: account.PreMargin,
            interest_base: account.InterestBase,
            interest: account.Interest,
            deposit: account.Deposit,
            withdraw: account.Withdraw,
            frozen_margin: account.FrozenMargin,
            frozen_cash: account.FrozenCash,
            frozen_commission: account.FrozenCommission,
            curr_margin: account.CurrMargin,
            cash_in: account.CashIn,
            commission: account.Commission,
            close_profit: account.CloseProfit,
            position_profit: account.PositionProfit,
            balance: account.Balance,
            available: account.Available,
            withdraw_quota: account.WithdrawQuota,
            reserve: account.Reserve,
            trading_day: ctp_trade_lib::gb18030_cstr_to_str_i8(&account.TradingDay).trim_end_matches('\0').to_string(),
            settlement_id: account.SettlementID,
            credit: account.Credit,
            mortgage: account.Mortgage,
            exchange_margin: account.ExchangeMargin,
            delivery_margin: account.DeliveryMargin,
            exchange_delivery_margin: account.ExchangeDeliveryMargin,
            reserve_balance: account.ReserveBalance,
            currency_id: ctp_trade_lib::gb18030_cstr_to_str_i8(&account.CurrencyID).trim_end_matches('\0').to_string(),
            pre_fund_mortgage_in: account.PreFundMortgageIn,
            pre_fund_mortgage_out: account.PreFundMortgageOut,
            fund_mortgage_in: account.FundMortgageIn,
            fund_mortgage_out: account.FundMortgageOut,
            fund_mortgage_available: account.FundMortgageAvailable,
            mortgage_able_fund: account.MortgageableFund,
            spec_product_margin: account.SpecProductMargin,
            spec_product_frozen_margin: account.SpecProductFrozenMargin,
            spec_product_commission: account.SpecProductCommission,
            spec_product_frozen_commission: account.SpecProductFrozenCommission,
            spec_product_position_profit: account.SpecProductPositionProfit,
            spec_product_close_profit: account.SpecProductCloseProfit,
            spec_product_position_profit_by_alg: account.SpecProductPositionProfitByAlg,
            spec_product_exchange_margin: account.SpecProductExchangeMargin,
            bis_margin: 0.0, // CTP API中不存在BizMargin字段
            bis_frozen_margin: 0.0, // CTP API中不存在BizFrozenMargin字段
            bis_commission: 0.0, // CTP API中不存在BizCommission字段
            bis_frozen_commission: 0.0, // CTP API中不存在BizFrozenCommission字段
            bis_position_profit: 0.0, // CTP API中不存在BizPositionProfit字段
            bis_close_profit: 0.0, // CTP API中不存在BizCloseProfit字段
            bis_position_profit_by_alg: 0.0, // CTP API中不存在BizPositionProfitByAlg字段
            bis_exchange_margin: 0.0, // CTP API中不存在BizExchangeMargin字段
            frozen_swap: account.FrozenSwap,
            remain_swap: account.RemainSwap,
        };

        // 保存账户数据
        {
            let mut account_data = get_ctp_account_data().lock().unwrap();
            *account_data = Some(account_info.clone());
        }

        println!("💰 [STREAM] Account data loaded: balance={:.2}, available={:.2}, margin={:.2}",
            account_info.balance,
            account_info.available,
            account_info.curr_margin
        );
    }

    // 标记查询完成
    let mut query_complete = get_account_query_complete().lock().unwrap();
    *query_complete = true;
    println!("✅ [STREAM] Account query completed");
}

